Collaborative Filtering using Spark

TO-DO

  • Running collaborative filtering using Mllib on Spark
    • Using implicit feedback (we do not have any direct input from the users regarding their preferences)
  • Running on DataProc on multiple clusters
  • Loading the data set from Bigquery
  • Writing the output into Biquery

References

Run collaborative filtering using MLlib - LOCAL


In [99]:
import os
import sys
spark_home = os.environ['SPARK_HOME'] = '/Users/ozimmer/GoogleDrive/berkeley/w261/spark-2.0.0-bin-hadoop2.6'
if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.9-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:43:17)
SparkSession available as 'spark'.

In [109]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
# Load and parse the data
data = sc.textFile("/Users/ozimmer/GoogleDrive/berkeley/w210/w210_vendor_recommendor/test_spark_1.csv")
header = data.first() #filter out the header

ratings = data.filter(lambda row: row != header)\
    .map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")


Mean Squared Error = 2670.12689742

In [134]:
#Create a RDD for prediction
data = [(145, 895988), (143, 348288), (143, 795270), (143, 795221), (143, 306804)]
data_rdd = sc.parallelize(data)

#Paste the prediction results in the model
model.predictAll(data_rdd).collect()


Out[134]:
[Rating(user=143, product=348288, rating=469886.4711073065),
 Rating(user=143, product=306804, rating=-47573.151232822696),
 Rating(user=143, product=795221, rating=-57848.51871782569),
 Rating(user=143, product=795270, rating=-75898.31224407976)]

Non-RDD Example


In [ ]:
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

DataProc - submit a job


In [5]:
!gsutil cp gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py .
!cat hello-world.py


Copying gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py...
/ [1 files][  147.0 B/  147.0 B]                                                
Operation completed over 1 objects/147.0 B.                                      
#!/usr/bin/python
import pyspark
sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!'])
words = sorted(rdd.collect())
print words


In [6]:
!gcloud dataproc jobs submit pyspark --cluster test1 hello-world.py


Copying file://hello-world.py [Content-Type=text/x-python]...
-
Operation completed over 1 objects/147.0 B.                                      
Job [a22b76e0-3d65-4c5e-85fb-c96fec436e12] submitted.
Waiting for job output...
17/07/15 20:31:39 INFO org.spark_project.jetty.util.log: Logging initialized @2226ms
17/07/15 20:31:39 INFO org.spark_project.jetty.server.Server: jetty-9.2.z-SNAPSHOT
17/07/15 20:31:39 INFO org.spark_project.jetty.server.ServerConnector: Started ServerConnector@7fca9698{HTTP/1.1}{0.0.0.0:4040}
17/07/15 20:31:39 INFO org.spark_project.jetty.server.Server: Started @2353ms
17/07/15 20:31:39 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.1-hadoop2
17/07/15 20:31:40 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at test1-m/10.142.0.2:8032
17/07/15 20:31:41 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1500148538199_0002
['Hello,', 'world!']
17/07/15 20:31:51 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@7fca9698{HTTP/1.1}{0.0.0.0:4040}
Job [a22b76e0-3d65-4c5e-85fb-c96fec436e12] finished successfully.
driverControlFilesUri: gs://dataproc-be4b433b-932d-4f29-842e-d0e896f67823-us/google-cloud-dataproc-metainfo/5241cd03-b0af-4013-8dac-6a252719650d/jobs/a22b76e0-3d65-4c5e-85fb-c96fec436e12/
driverOutputResourceUri: gs://dataproc-be4b433b-932d-4f29-842e-d0e896f67823-us/google-cloud-dataproc-metainfo/5241cd03-b0af-4013-8dac-6a252719650d/jobs/a22b76e0-3d65-4c5e-85fb-c96fec436e12/driveroutput
placement:
  clusterName: test1
  clusterUuid: 5241cd03-b0af-4013-8dac-6a252719650d
pysparkJob:
  loggingConfig: {}
  mainPythonFileUri: gs://dataproc-be4b433b-932d-4f29-842e-d0e896f67823-us/google-cloud-dataproc-metainfo/5241cd03-b0af-4013-8dac-6a252719650d/jobs/a22b76e0-3d65-4c5e-85fb-c96fec436e12/staging/hello-world.py
reference:
  jobId: a22b76e0-3d65-4c5e-85fb-c96fec436e12
  projectId: fiery-set-171213
status:
  state: DONE
  stateStartTime: '2017-07-15T20:31:56.201Z'
statusHistory:
- state: PENDING
  stateStartTime: '2017-07-15T20:31:33.866Z'
- state: SETUP_DONE
  stateStartTime: '2017-07-15T20:31:34.368Z'
- details: Agent reported job success
  state: RUNNING
  stateStartTime: '2017-07-15T20:31:34.853Z'
yarnApplications:
- name: hello-world.py
  progress: 1.0
  state: FINISHED
  trackingUrl: http://test1-m:8088/proxy/application_1500148538199_0002/